package com.example.demo.stream.source;

import cn.hutool.json.JSONUtil;
import com.example.demo.env.SpringContextHolder;
import org.apache.commons.collections.MapUtils;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import redis.clients.jedis.JedisCluster;

import java.util.Map;

/**
 * 加载MySql数据源内容
 *
 * @author: wbx
 */
public class RedisSource<Out> extends RichSourceFunction<Out> {

    /**
     * redis缓存key
     */
    private final String redisKey;

    private Class<Out> cls;


    public RedisSource(String redisKey, Class<Out> cls) {
        this.redisKey = redisKey;
        this.cls = cls;
    }


    /**
     * 查询数据库 封装到数据源中
     *
     * @param ctx 数据源收集器
     */
    @Override
    public void run(SourceContext<Out> ctx) {
        // 获取redis操作对象
        JedisCluster jedis = SpringContextHolder.getBean(JedisCluster.class);
        // 获取hash类型的数值
        Map<String, String> stringStringMap = jedis.hgetAll(redisKey);
        if(MapUtils.isNotEmpty(stringStringMap)) {
            stringStringMap.forEach((key ,val) -> {
                // 将json字符串转为对象
                Out o = JSONUtil.toBean(val, cls);
                // 封装到数据源中
                ctx.collect(o);
            });
        }

    }

    @Override
    public void cancel() {

    }
}
